/*-------------------------------------------------------------------------
 *
 * maintenanced.c
 *	  Background worker run for each citus using database in a postgres
 *    cluster.
 *
 * This file provides infrastructure for launching exactly one a background
 * worker for every database in which citus is used.  That background worker
 * can then perform work like deadlock detection, prepared transaction
 * recovery, and cleanup.
 *
 * Copyright (c) Citus Data, Inc.
 *
 *-------------------------------------------------------------------------
 */

#include <time.h>

#include "postgres.h"

#include "miscadmin.h"
#include "pgstat.h"

#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/namespace.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_extension.h"
#include "catalog/pg_namespace.h"
#include "commands/async.h"
#include "commands/extension.h"
#include "common/hashfn.h"
#include "libpq/pqsignal.h"
#include "nodes/makefuncs.h"
#include "postmaster/bgworker.h"
#include "postmaster/postmaster.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lmgr.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"

#include "citus_version.h"
#include "pg_version_constants.h"

#include "distributed/background_jobs.h"
#include "distributed/background_worker_utils.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/distributed_deadlock_detection.h"
#include "distributed/maintenanced.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_cleaner.h"
#include "distributed/statistics_collection.h"
#include "distributed/stats/query_stats.h"
#include "distributed/transaction_recovery.h"
#include "distributed/version_compat.h"

/*
 * Shared memory data for all maintenance workers.
 */
typedef struct MaintenanceDaemonControlData
{
	/*
	 * Lock protecting the shared memory state.  This is to be taken when
	 * looking up (shared mode) or inserting (exclusive mode) per-database
	 * data in MaintenanceDaemonDBHash.
	 */
	int trancheId;
	char *lockTrancheName;
	LWLock lock;
} MaintenanceDaemonControlData;


/*
 * Per database worker state.
 */
typedef struct MaintenanceDaemonDBData
{
	/* hash key: database to run on */
	Oid databaseOid;

	/* information: which user to use */
	Oid userOid;
	pid_t workerPid;
	bool daemonStarted;
	bool triggerNodeMetadataSync;
	Latch *latch; /* pointer to the background worker's latch */
} MaintenanceDaemonDBData;

/* config variable for distributed deadlock detection timeout */
double DistributedDeadlockDetectionTimeoutFactor = 2.0;
int Recover2PCInterval = 60000;
int DeferShardDeleteInterval = 15000;
int BackgroundTaskQueueCheckInterval = 5000;
int MaxBackgroundTaskExecutors = 1;
char *MainDb = "";

/* config variables for metadata sync timeout */
int MetadataSyncInterval = 60000;
int MetadataSyncRetryInterval = 5000;

static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;

/*
 * Hash-table of workers, one entry for each database with citus
 * activated.
 */
static HTAB *MaintenanceDaemonDBHash;
static ErrorContextCallback errorCallback = { 0 };
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t got_SIGTERM = false;

/* set to true when becoming a maintenance daemon */
static bool IsMaintenanceDaemon = false;

static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS);
static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS);
static void MaintenanceDaemonShmemExit(int code, Datum arg);
static void MaintenanceDaemonErrorContext(void *arg);
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
static void WarnMaintenanceDaemonNotStarted(void);
static MaintenanceDaemonDBData * GetMaintenanceDaemonDBHashEntry(Oid databaseId,
																 bool *found);

/*
 * InitializeMaintenanceDaemon, called at server start, is responsible for
 * requesting shared memory and related infrastructure required by maintenance
 * daemons.
 */
void
InitializeMaintenanceDaemon(void)
{
	prev_shmem_startup_hook = shmem_startup_hook;
	shmem_startup_hook = MaintenanceDaemonShmemInit;
}


/*
 * GetMaintenanceDaemonDBHashEntry searches the MaintenanceDaemonDBHash for the
 * databaseId. It returns the entry if found or creates a new entry and initializes
 * the value with zeroes.
 */
MaintenanceDaemonDBData *
GetMaintenanceDaemonDBHashEntry(Oid databaseId, bool *found)
{
	MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
		MaintenanceDaemonDBHash,
		&MyDatabaseId,
		HASH_ENTER_NULL,
		found);

	if (!dbData)
	{
		elog(LOG,
			 "cannot create or find the maintenance deamon hash entry for database %u",
			 databaseId);
		return NULL;
	}

	if (!*found)
	{
		/* ensure the values in MaintenanceDaemonDBData are zero */
		memset(((char *) dbData) + sizeof(Oid), 0,
			   sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
	}

	return dbData;
}


/*
 * InitializeMaintenanceDaemonForMainDb is called in _PG_Init
 * at which stage we are not in a transaction or have databaseOid
 */
void
InitializeMaintenanceDaemonForMainDb(void)
{
	if (strcmp(MainDb, "") == 0)
	{
		elog(LOG, "There is no designated Main database.");
		return;
	}

	CitusBackgroundWorkerConfig config = {
		.workerName = "Citus Maintenance Daemon for Main DB",
		.functionName = "CitusMaintenanceDaemonMain",
		.mainArg = (Datum) 0,
		.extensionOwner = InvalidOid,
		.needsNotification = false,
		.waitForStartup = false,
		.restartTime = CITUS_BGW_DEFAULT_RESTART_TIME,
		.startTime = CITUS_BGW_DEFAULT_START_TIME,
		.workerType = NULL, /* use default */
		.extraData = NULL,
		.extraDataSize = 0
	};
	BackgroundWorker worker;
	InitializeCitusBackgroundWorker(&worker, &config);
	RegisterBackgroundWorker(&worker);
}


/*
 * InitializeMaintenanceDaemonBackend, called at backend start and
 * configuration changes, is responsible for starting a per-database
 * maintenance worker if necessary.
 */
void
InitializeMaintenanceDaemonBackend(void)
{
	Oid extensionOwner = CitusExtensionOwner();
	bool found = false;

	LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);

	MaintenanceDaemonDBData *dbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId,
																	  &found);

	if (dbData == NULL)
	{
		WarnMaintenanceDaemonNotStarted();
		LWLockRelease(&MaintenanceDaemonControl->lock);
		return;
	}

	if (IsMaintenanceDaemon)
	{
		/*
		 * InitializeMaintenanceDaemonBackend is called by the maintenance daemon
		 * itself. In that case, we clearly don't need to start another maintenance
		 * daemon.
		 */
		LWLockRelease(&MaintenanceDaemonControl->lock);
		return;
	}

	if (!found || !dbData->daemonStarted)
	{
		Assert(dbData->workerPid == 0);

		char workerName[BGW_MAXLEN];

		SafeSnprintf(workerName, sizeof(workerName),
					 "Citus Maintenance Daemon: %u/%u",
					 MyDatabaseId, extensionOwner);

		CitusBackgroundWorkerConfig config = {
			.workerName = workerName,
			.functionName = "CitusMaintenanceDaemonMain",
			.mainArg = ObjectIdGetDatum(MyDatabaseId),
			.extensionOwner = extensionOwner,
			.needsNotification = true,
			.waitForStartup = true,
			.restartTime = CITUS_BGW_DEFAULT_RESTART_TIME,
			.startTime = CITUS_BGW_DEFAULT_START_TIME,
			.workerType = NULL, /* use default */
			.extraData = NULL,
			.extraDataSize = 0
		};
		BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config);

		if (!handle)
		{
			WarnMaintenanceDaemonNotStarted();
			dbData->daemonStarted = false;
			LWLockRelease(&MaintenanceDaemonControl->lock);

			return;
		}

		dbData->daemonStarted = true;
		dbData->userOid = extensionOwner;
		dbData->workerPid = 0;
		dbData->triggerNodeMetadataSync = false;
		LWLockRelease(&MaintenanceDaemonControl->lock);

		pfree(handle);
	}
	else
	{
		Assert(dbData->daemonStarted);

		/*
		 * If owner of extension changed, wake up daemon. It'll notice and
		 * restart.
		 */
		if (dbData->userOid != extensionOwner)
		{
			dbData->userOid = extensionOwner;
			if (dbData->latch)
			{
				SetLatch(dbData->latch);
			}
		}
		LWLockRelease(&MaintenanceDaemonControl->lock);
	}
}


/*
 * WarnMaintenanceDaemonNotStarted warns that maintenanced couldn't be started.
 */
static void
WarnMaintenanceDaemonNotStarted(void)
{
	ereport(WARNING, (errmsg("could not start maintenance background worker"),
					  errhint("Increasing max_worker_processes might help.")));
}


/*
 * ConnectToDatabase connects to the database for the given databaseOid.
 * if databaseOid is 0, connects to MainDb and then creates a hash entry.
 * If a hash entry cannot be created for MainDb it exits the process requesting a restart.
 * However for regular databases, it exits without requesting a restart since another
 * subsequent backend is expected to start the Maintenance Daemon.
 * If the found hash entry has a valid workerPid, it exits
 * without requesting a restart since there is already a daemon running.
 */
static MaintenanceDaemonDBData *
ConnectToDatabase(Oid databaseOid)
{
	MaintenanceDaemonDBData *myDbData = NULL;


	bool isMainDb = false;

	LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);


	if (databaseOid == 0)
	{
		char *databaseName = MainDb;

		/*
		 * Since we cannot query databaseOid without initializing Postgres
		 * first, connect to the database by name.
		 */
		BackgroundWorkerInitializeConnection(databaseName, NULL, 0);

		/*
		 * Now we have a valid MyDatabaseId.
		 * Insert the hash entry for the database to the Maintenance Deamon Hash.
		 */
		bool found = false;

		myDbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, &found);

		if (!myDbData)
		{
			/*
			 * If an entry cannot be created,
			 * return code of 1 requests worker restart
			 * Since BackgroundWorker for the MainDb is only registered
			 * once during server startup, we need to retry.
			 */
			proc_exit(1);
		}

		if (found && myDbData->workerPid != 0)
		{
			/* Another maintenance daemon is running.*/

			proc_exit(0);
		}

		databaseOid = MyDatabaseId;
		myDbData->userOid = GetSessionUserId();
		isMainDb = true;
	}
	else
	{
		myDbData = (MaintenanceDaemonDBData *)
				   hash_search(MaintenanceDaemonDBHash, &databaseOid,
							   HASH_FIND, NULL);

		if (!myDbData)
		{
			/*
			 * When the database crashes, background workers are restarted, but
			 * the state in shared memory is lost. In that case, we exit and
			 * wait for a session to call InitializeMaintenanceDaemonBackend
			 * to properly add it to the hash.
			 */

			proc_exit(0);
		}

		if (myDbData->workerPid != 0)
		{
			/*
			 * Another maintenance daemon is running. This usually happens because
			 * postgres restarts the daemon after an non-zero exit, and
			 * InitializeMaintenanceDaemonBackend started one before postgres did.
			 * In that case, the first one stays and the last one exits.
			 */

			proc_exit(0);
		}
	}

	before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(databaseOid));

	/*
	 * Signal that I am the maintenance daemon now.
	 *
	 * From this point, DROP DATABASE/EXTENSION will send a SIGTERM to me.
	 */
	myDbData->workerPid = MyProcPid;

	/*
	 * Signal that we are running. This in mainly needed in case of restart after
	 * an error, otherwise the daemonStarted flag is already true.
	 */
	myDbData->daemonStarted = true;

	/* wire up signals */
	pqsignal(SIGTERM, MaintenanceDaemonSigTermHandler);
	pqsignal(SIGHUP, MaintenanceDaemonSigHupHandler);
	BackgroundWorkerUnblockSignals();

	myDbData->latch = MyLatch;

	IsMaintenanceDaemon = true;

	LWLockRelease(&MaintenanceDaemonControl->lock);

	memset(&errorCallback, 0, sizeof(errorCallback));
	errorCallback.callback = MaintenanceDaemonErrorContext;
	errorCallback.arg = (void *) myDbData;
	errorCallback.previous = error_context_stack;
	error_context_stack = &errorCallback;

	elog(LOG, "starting maintenance daemon on database %u user %u",
		 databaseOid, myDbData->userOid);

	if (!isMainDb)
	{
		/* connect to database, after that we can actually access catalogs */
		BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
	}

	return myDbData;
}


/*
 * CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
 * be started by the background worker infrastructure.  If it errors out,
 * it'll be restarted after a few seconds.
 */
void
CitusMaintenanceDaemonMain(Datum main_arg)
{
	Oid databaseOid = DatumGetObjectId(main_arg);
	TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
		TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
	bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
	TimestampTz lastRecoveryTime = 0;
	TimestampTz lastShardCleanTime = 0;
	TimestampTz lastStatStatementsPurgeTime = 0;
	TimestampTz nextMetadataSyncTime = 0;

	/* state kept for the background tasks queue monitor */
	TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
	BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
	bool backgroundTasksQueueWarnedForLock = false;


	/*
	 * We do metadata sync in a separate background worker. We need its
	 * handle to be able to check its status.
	 */
	BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;

	MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid);

	/* make worker recognizable in pg_stat_activity */
	pgstat_report_appname("Citus Maintenance Daemon");

	/*
	 * Terminate orphaned metadata sync daemons spawned from previously terminated
	 * or crashed maintenanced instances.
	 */
	SignalMetadataSyncDaemon(MyDatabaseId, SIGTERM);

	/* enter main loop */
	while (!got_SIGTERM)
	{
		int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
		double timeout = 10000.0; /* use this if the deadlock detection is disabled */
		bool foundDeadlock = false;

		CHECK_FOR_INTERRUPTS();

		CitusTableCacheFlushInvalidatedEntries();

		/*
		 * XXX: Each task should clear the metadata cache before every iteration
		 * by calling InvalidateMetadataSystemCache(), because otherwise it
		 * might contain stale OIDs. It appears that in some cases invalidation
		 * messages for a DROP EXTENSION may arrive during these tasks and
		 * this causes us to cache a stale pg_dist_node OID. We'd actually expect
		 * all invalidations to arrive after obtaining a lock in LockCitusExtension.
		 */

		/*
		 * Perform Work. If a specific task needs to be called sooner than
		 * timeout indicates, it's ok to lower it to that value. Expensive
		 * tasks should do their own time math about whether to re-run checks.
		 */

#ifdef HAVE_LIBCURL
		if (EnableStatisticsCollection &&
			GetCurrentTimestamp() >= nextStatsCollectionTime)
		{
			bool statsCollectionSuccess = false;
			InvalidateMetadataSystemCache();
			StartTransactionCommand();

			/*
			 * Lock the extension such that it cannot be dropped or created
			 * concurrently. Skip statistics collection if citus extension is
			 * not accessible.
			 *
			 * Similarly, we skip statistics collection if there exists any
			 * version mismatch or the extension is not fully created yet.
			 */
			if (!LockCitusExtension())
			{
				ereport(DEBUG1, (errmsg("could not lock the citus extension, "
										"skipping statistics collection")));
			}
			else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
			{
				FlushDistTableCache();
				WarnIfSyncDNS();
				statsCollectionSuccess = CollectBasicUsageStatistics();
			}

			/*
			 * If statistics collection was successful the next collection is
			 * 24-hours later. Also, if this was a retry attempt we don't do
			 * any more retries until 24-hours later, so we limit number of
			 * retries to one.
			 */
			if (statsCollectionSuccess || retryStatsCollection)
			{
				nextStatsCollectionTime =
					TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
												STATS_COLLECTION_TIMEOUT_MILLIS);
				retryStatsCollection = false;
			}
			else
			{
				nextStatsCollectionTime =
					TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
												STATS_COLLECTION_RETRY_TIMEOUT_MILLIS);
				retryStatsCollection = true;
			}

			CommitTransactionCommand();
		}
#endif

		pid_t metadataSyncBgwPid = 0;
		BgwHandleStatus metadataSyncStatus =
			metadataSyncBgwHandle != NULL ?
			GetBackgroundWorkerPid(metadataSyncBgwHandle, &metadataSyncBgwPid) :
			BGWH_STOPPED;

		if (metadataSyncStatus != BGWH_STOPPED &&
			GetCurrentTimestamp() >= nextMetadataSyncTime)
		{
			/*
			 * Metadata sync is still running, recheck in a short while.
			 */
			int nextTimeout = MetadataSyncRetryInterval;
			nextMetadataSyncTime =
				TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout);
			timeout = Min(timeout, nextTimeout);
		}
		else if (!RecoveryInProgress() &&
				 metadataSyncStatus == BGWH_STOPPED &&
				 (MetadataSyncTriggeredCheckAndReset(myDbData) ||
				  GetCurrentTimestamp() >= nextMetadataSyncTime))
		{
			if (metadataSyncBgwHandle)
			{
				pfree(metadataSyncBgwHandle);
				metadataSyncBgwHandle = NULL;
			}

			InvalidateMetadataSystemCache();
			StartTransactionCommand();
			PushActiveSnapshot(GetTransactionSnapshot());

			int nextTimeout = MetadataSyncRetryInterval;
			bool syncMetadata = false;

			if (!LockCitusExtension())
			{
				ereport(DEBUG1, (errmsg("could not lock the citus extension, "
										"skipping metadata sync")));
			}
			else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
			{
				bool lockFailure = false;
				syncMetadata = ShouldInitiateMetadataSync(&lockFailure);

				/*
				 * If lock fails, we need to recheck in a short while. If we are
				 * going to sync metadata, we should recheck in a short while to
				 * see if it failed. Otherwise, we can wait longer.
				 */
				nextTimeout = (lockFailure || syncMetadata) ?
							  MetadataSyncRetryInterval :
							  MetadataSyncInterval;
			}

			PopActiveSnapshot();
			CommitTransactionCommand();

			if (syncMetadata)
			{
				metadataSyncBgwHandle =
					SpawnSyncNodeMetadataToNodes(MyDatabaseId, myDbData->userOid);
			}

			nextMetadataSyncTime =
				TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout);
			timeout = Min(timeout, nextTimeout);
		}

		/*
		 * If enabled, run 2PC recovery on primary nodes (where !RecoveryInProgress()),
		 * since we'll write to the pg_dist_transaction log.
		 */
		if (Recover2PCInterval > 0 && !RecoveryInProgress() &&
			TimestampDifferenceExceeds(lastRecoveryTime, GetCurrentTimestamp(),
									   Recover2PCInterval))
		{
			int recoveredTransactionCount = 0;

			InvalidateMetadataSystemCache();
			StartTransactionCommand();

			if (!LockCitusExtension())
			{
				ereport(DEBUG1, (errmsg("could not lock the citus extension, "
										"skipping 2PC recovery")));
			}
			else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
			{
				/*
				 * Record last recovery time at start to ensure we run once per
				 * Recover2PCInterval even if RecoverTwoPhaseCommits takes some time.
				 */
				lastRecoveryTime = GetCurrentTimestamp();

				recoveredTransactionCount = RecoverTwoPhaseCommits();
			}

			CommitTransactionCommand();

			if (recoveredTransactionCount > 0)
			{
				ereport(LOG, (errmsg("maintenance daemon recovered %d distributed "
									 "transactions",
									 recoveredTransactionCount)));
			}

			/* make sure we don't wait too long */
			timeout = Min(timeout, Recover2PCInterval);
		}

		/* the config value -1 disables the distributed deadlock detection  */
		if (DistributedDeadlockDetectionTimeoutFactor != -1.0)
		{
			double deadlockTimeout =
				DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout;

			InvalidateMetadataSystemCache();
			StartTransactionCommand();

			/*
			 * We skip the deadlock detection if citus extension
			 * is not accessible.
			 *
			 * Similarly, we skip to run the deadlock checks if
			 * there exists any version mismatch or the extension
			 * is not fully created yet.
			 */
			if (!LockCitusExtension())
			{
				ereport(DEBUG1, (errmsg("could not lock the citus extension, "
										"skipping deadlock detection")));
			}
			else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
			{
				foundDeadlock = CheckForDistributedDeadlocks();
			}

			CommitTransactionCommand();

			/*
			 * If we find any deadlocks, run the distributed deadlock detection
			 * more often since it is quite possible that there are other
			 * deadlocks need to be resolved.
			 *
			 * Thus, we use 1/20 of the calculated value. With the default
			 * values (i.e., deadlock_timeout 1 seconds,
			 * citus.distributed_deadlock_detection_factor 2), we'd be able to cancel
			 * ~10 distributed deadlocks per second.
			 */
			if (foundDeadlock)
			{
				deadlockTimeout = deadlockTimeout / 20.0;
			}

			/* make sure we don't wait too long */
			timeout = Min(timeout, deadlockTimeout);
		}

		if (!RecoveryInProgress() && DeferShardDeleteInterval > 0 &&
			TimestampDifferenceExceeds(lastShardCleanTime, GetCurrentTimestamp(),
									   DeferShardDeleteInterval))
		{
			int numberOfDroppedResources = 0;

			InvalidateMetadataSystemCache();
			StartTransactionCommand();

			if (!LockCitusExtension())
			{
				ereport(DEBUG1, (errmsg(
									 "could not lock the citus extension, skipping shard cleaning")));
			}
			else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
			{
				/*
				 * Record last shard clean time at start to ensure we run once per
				 * DeferShardDeleteInterval.
				 */
				lastShardCleanTime = GetCurrentTimestamp();

				numberOfDroppedResources = TryDropOrphanedResources();
			}

			CommitTransactionCommand();

			if (numberOfDroppedResources > 0)
			{
				ereport(LOG, (errmsg("maintenance daemon dropped %d "
									 "resources previously marked to be removed",
									 numberOfDroppedResources)));
			}

			/* make sure we don't wait too long */
			timeout = Min(timeout, DeferShardDeleteInterval);
		}

		if (StatStatementsPurgeInterval > 0 &&
			StatStatementsTrack != STAT_STATEMENTS_TRACK_NONE &&
			TimestampDifferenceExceeds(lastStatStatementsPurgeTime, GetCurrentTimestamp(),
									   (StatStatementsPurgeInterval * 1000)))
		{
			StartTransactionCommand();

			if (!LockCitusExtension())
			{
				ereport(DEBUG1, (errmsg("could not lock the citus extension, "
										"skipping stat statements purging")));
			}
			else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
			{
				/*
				 * Record last time we perform the purge to ensure we run once per
				 * StatStatementsPurgeInterval.
				 */
				lastStatStatementsPurgeTime = GetCurrentTimestamp();

				CitusQueryStatsSynchronizeEntries();
			}

			CommitTransactionCommand();

			/* make sure we don't wait too long, need to convert seconds to milliseconds */
			timeout = Min(timeout, (StatStatementsPurgeInterval * 1000));
		}

		pid_t backgroundTaskQueueWorkerPid = 0;
		BgwHandleStatus backgroundTaskQueueWorkerStatus =
			backgroundTasksQueueBgwHandle != NULL ? GetBackgroundWorkerPid(
				backgroundTasksQueueBgwHandle, &backgroundTaskQueueWorkerPid) :
			BGWH_STOPPED;
		if (!RecoveryInProgress() && BackgroundTaskQueueCheckInterval > 0 &&
			TimestampDifferenceExceeds(lastBackgroundTaskQueueCheck,
									   GetCurrentTimestamp(),
									   BackgroundTaskQueueCheckInterval) &&
			backgroundTaskQueueWorkerStatus == BGWH_STOPPED)
		{
			/* clear old background worker for task queue before checking for new tasks */
			if (backgroundTasksQueueBgwHandle)
			{
				pfree(backgroundTasksQueueBgwHandle);
				backgroundTasksQueueBgwHandle = NULL;
			}

			StartTransactionCommand();

			bool shouldStartBackgroundTaskQueueBackgroundWorker = false;
			if (!LockCitusExtension())
			{
				ereport(DEBUG1, (errmsg("could not lock the citus extension, "
										"skipping stat statements purging")));
			}
			else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
			{
				/* perform catalog precheck */
				shouldStartBackgroundTaskQueueBackgroundWorker =
					HasRunnableBackgroundTask();
			}

			CommitTransactionCommand();

			if (shouldStartBackgroundTaskQueueBackgroundWorker)
			{
				/*
				 * Before we start the background worker we want to check if an orphaned
				 * one is still running. This could happen when the maintenance daemon
				 * restarted in a way where the background task queue monitor wasn't
				 * restarted.
				 *
				 * To check if an orphaned background task queue monitor is still running
				 * we quickly acquire the lock without waiting. If we can't acquire the
				 * lock this means that some other backed still has the lock. We prevent a
				 * new backend from starting and log a warning that we found that another
				 * process still holds the lock.
				 */
				LOCKTAG tag = { 0 };
				SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR);
				const bool sessionLock = false;
				const bool dontWait = true;
				LockAcquireResult locked =
					LockAcquire(&tag, AccessExclusiveLock, sessionLock, dontWait);

				if (locked == LOCKACQUIRE_NOT_AVAIL)
				{
					if (!backgroundTasksQueueWarnedForLock)
					{
						ereport(WARNING, (errmsg("background task queue monitor already "
												 "held"),
										  errdetail("the background task queue monitor "
													"lock is held by another backend, "
													"indicating the maintenance daemon "
													"has lost track of an already "
													"running background task queue "
													"monitor, not starting a new one")));
						backgroundTasksQueueWarnedForLock = true;
					}
				}
				else
				{
					LockRelease(&tag, AccessExclusiveLock, sessionLock);

					/* we were able to acquire the lock, reset the warning tracker */
					backgroundTasksQueueWarnedForLock = false;

					/* spawn background worker */
					ereport(LOG, (errmsg("found scheduled background tasks, starting new "
										 "background task queue monitor")));

					backgroundTasksQueueBgwHandle =
						StartCitusBackgroundTaskQueueMonitor(MyDatabaseId,
															 myDbData->userOid);

					if (!backgroundTasksQueueBgwHandle ||
						GetBackgroundWorkerPid(backgroundTasksQueueBgwHandle,
											   &backgroundTaskQueueWorkerPid) ==
						BGWH_STOPPED)
					{
						ereport(WARNING, (errmsg("unable to start background worker for "
												 "background task execution")));
					}
				}
			}

			/* interval management */
			lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
			timeout = Min(timeout, BackgroundTaskQueueCheckInterval);
		}

		/*
		 * Wait until timeout, or until somebody wakes us up. Also cast the timeout to
		 * integer where we've calculated it using double for not losing the precision.
		 */
		int rc = WaitLatch(MyLatch, latchFlags, (long) timeout, PG_WAIT_EXTENSION);

		/* emergency bailout if postmaster has died */
		if (rc & WL_POSTMASTER_DEATH)
		{
			proc_exit(1);
		}

		if (rc & WL_LATCH_SET)
		{
			ResetLatch(MyLatch);
			CHECK_FOR_INTERRUPTS();

			/* check for changed configuration */
			if (myDbData->userOid != GetSessionUserId())
			{
				/* return code of 1 requests worker restart */
				proc_exit(1);
			}

			/*
			 * Could also add code checking whether extension still exists,
			 * but that'd complicate things a bit, because we'd have to delete
			 * the shared memory entry.  There'd potentially be a race
			 * condition where the extension gets re-created, checking that
			 * this entry still exists, and it getting deleted just after.
			 * Doesn't seem worth catering for that.
			 */
		}

		if (got_SIGHUP)
		{
			got_SIGHUP = false;
			ProcessConfigFile(PGC_SIGHUP);
		}
	}

	if (metadataSyncBgwHandle)
	{
		TerminateBackgroundWorker(metadataSyncBgwHandle);
	}
}


/*
 * MaintenanceDaemonShmemSize computes how much shared memory is required.
 */
size_t
MaintenanceDaemonShmemSize(void)
{
	Size size = 0;

	size = add_size(size, sizeof(MaintenanceDaemonControlData));

	/*
	 * We request enough shared memory to have one hash-table entry for each
	 * worker process. We couldn't start more anyway, so there's little point
	 * in allocating more.
	 */
	Size hashSize = hash_estimate_size(max_worker_processes,
									   sizeof(MaintenanceDaemonDBData));
	size = add_size(size, hashSize);

	return size;
}


/*
 * MaintenanceDaemonShmemInit initializes the requested shared memory for the
 * maintenance daemon.
 */
void
MaintenanceDaemonShmemInit(void)
{
	bool alreadyInitialized = false;
	HASHCTL hashInfo;

	LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);

	MaintenanceDaemonControl =
		(MaintenanceDaemonControlData *) ShmemInitStruct("Citus Maintenance Daemon",
														 MaintenanceDaemonShmemSize(),
														 &alreadyInitialized);

	/*
	 * Might already be initialized on EXEC_BACKEND type platforms that call
	 * shared library initialization functions in every backend.
	 */
	if (!alreadyInitialized)
	{
		MaintenanceDaemonControl->trancheId = LWLockNewTrancheId();
		MaintenanceDaemonControl->lockTrancheName = "Citus Maintenance Daemon";
		LWLockRegisterTranche(MaintenanceDaemonControl->trancheId,
							  MaintenanceDaemonControl->lockTrancheName);

		LWLockInitialize(&MaintenanceDaemonControl->lock,
						 MaintenanceDaemonControl->trancheId);
	}


	memset(&hashInfo, 0, sizeof(hashInfo));
	hashInfo.keysize = sizeof(Oid);
	hashInfo.entrysize = sizeof(MaintenanceDaemonDBData);
	hashInfo.hash = tag_hash;
	int hashFlags = (HASH_ELEM | HASH_FUNCTION);

	MaintenanceDaemonDBHash = ShmemInitHash("Maintenance Database Hash",
											max_worker_processes, max_worker_processes,
											&hashInfo, hashFlags);

	LWLockRelease(AddinShmemInitLock);

	if (prev_shmem_startup_hook != NULL)
	{
		prev_shmem_startup_hook();
	}
}


/*
 * MaintenaceDaemonShmemExit is the before_shmem_exit handler for cleaning up MaintenanceDaemonDBHash
 */
static void
MaintenanceDaemonShmemExit(int code, Datum arg)
{
	Oid databaseOid = DatumGetObjectId(arg);

	LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);

	MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
										hash_search(MaintenanceDaemonDBHash, &databaseOid,
													HASH_FIND, NULL);

	/* myDbData is NULL after StopMaintenanceDaemon */
	if (myDbData != NULL)
	{
		/*
		 * Once the maintenance daemon fails (e.g., due to an error in the main loop),
		 * both Postgres tries to restart the failed daemon and Citus attempt to start
		 * a new one. In that case, the one started by Citus ends up here.
		 *
		 * As the maintenance daemon that Citus tried to start, we might see the entry
		 * for the daemon restarted by Postgres if the system was so slow that it
		 * took a long time for us to be re-scheduled to call MaintenanceDaemonShmemExit(),
		 * e.g., under valgrind testing.
		 *
		 * In that case, we should unregister ourself only if we are still the registered
		 * maintenance daemon.
		 */
		if (myDbData->workerPid == MyProcPid)
		{
			myDbData->daemonStarted = false;
			myDbData->workerPid = 0;
		}
		else
		{
			ereport(LOG, (errmsg(
							  "maintenance daemon for database %u has already been replaced by "
							  "Postgres, skipping to unregister this maintenance daemon",
							  databaseOid)));
		}
	}

	LWLockRelease(&MaintenanceDaemonControl->lock);
}


/* MaintenanceDaemonSigTermHandler sets the got_SIGTERM flag.*/
static void
MaintenanceDaemonSigTermHandler(SIGNAL_ARGS)
{
	int save_errno = errno;

	got_SIGTERM = true;
	if (MyProc != NULL)
	{
		SetLatch(&MyProc->procLatch);
	}

	errno = save_errno;
}


/*
 * MaintenanceDaemonSigHupHandler set a flag to re-read config file at next
 * convenient time.
 */
static void
MaintenanceDaemonSigHupHandler(SIGNAL_ARGS)
{
	int save_errno = errno;

	got_SIGHUP = true;
	if (MyProc != NULL)
	{
		SetLatch(&MyProc->procLatch);
	}

	errno = save_errno;
}


/*
 * MaintenanceDaemonErrorContext adds some context to log messages to make it
 * easier to associate them with the maintenance daemon.
 */
static void
MaintenanceDaemonErrorContext(void *arg)
{
	MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) arg;
	errcontext("Citus maintenance daemon for database %u user %u",
			   myDbData->databaseOid, myDbData->userOid);
}


/*
 * LockCitusExtension acquires a lock on the Citus extension or returns
 * false if the extension does not exist or is being dropped.
 */
bool
LockCitusExtension(void)
{
	Oid extensionOid = get_extension_oid("citus", true);
	if (extensionOid == InvalidOid)
	{
		/* citus extension does not exist */
		return false;
	}

	LockDatabaseObject(ExtensionRelationId, extensionOid, 0, AccessShareLock);

	/*
	 * The extension may have been dropped and possibly recreated prior to
	 * obtaining a lock. Check whether we still get the expected OID.
	 */
	Oid recheckExtensionOid = get_extension_oid("citus", true);
	if (recheckExtensionOid != extensionOid)
	{
		return false;
	}

	return true;
}


/*
 * StopMaintenanceDaemon stops the maintenance daemon for the
 * given database and removes it from the maintenance daemon
 * control hash.
 */
void
StopMaintenanceDaemon(Oid databaseId)
{
	bool found = false;
	pid_t workerPid = 0;

	LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);

	MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
		MaintenanceDaemonDBHash,
		&databaseId,
		HASH_REMOVE, &found);

	if (found)
	{
		workerPid = dbData->workerPid;
	}

	LWLockRelease(&MaintenanceDaemonControl->lock);

	if (workerPid > 0)
	{
		kill(workerPid, SIGTERM);
	}
}


/*
 * TriggerMetadataSync triggers the maintenance daemon to do
 * a node metadata sync for the given database.
 */
void
TriggerNodeMetadataSync(Oid databaseId)
{
	bool found = false;

	LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);

	MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
		MaintenanceDaemonDBHash,
		&databaseId,
		HASH_FIND, &found);
	if (found)
	{
		dbData->triggerNodeMetadataSync = true;

		/* set latch to wake-up the maintenance loop */
		SetLatch(dbData->latch);
	}

	LWLockRelease(&MaintenanceDaemonControl->lock);
}


/*
 * MetadataSyncTriggeredCheckAndReset checks if metadata sync has been
 * triggered for the given database, and resets the flag.
 */
static bool
MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData)
{
	LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);

	bool metadataSyncTriggered = dbData->triggerNodeMetadataSync;
	dbData->triggerNodeMetadataSync = false;

	LWLockRelease(&MaintenanceDaemonControl->lock);

	return metadataSyncTriggered;
}
